#!/usr/bin/env python
# Copyright (C) 2012 Nanakos Chrysostomos - <cnanakos@ekt.gr>
# National Documentation Centre
# dPool Elasic Cluster - Distributed Image Converting System
# dPool is placed under the GNU General Public License, version 3 or later.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
import zmq
import json
import threading
import time
import platform
from daemon import runner
import optparse
import sys
import os
import ConfigParser
import traceback
import psutil
import os.path
import setproctitle
# dPool module
from dutils import DistillerLogger, ColorAttr, States, DistillerTailLog,Services

version = """dPool Version: 0.1"""
usage = """usage: %prog [options] command [arg...]

commands:
  start     start the server daemon
  stop      stop the server daemon
  restart   restart the server daemon
  status    return server daemon status
  log	    watch server daemon logfile

Example session:
  %prog start     # starts daemon
  %prog status    # print daemon's status
  %prog log	  # watch daemon's logfile
  %prog stop      # stops daemon
  %prog restart   # restarts daemon"""


# Configuration file
_conffile='distiller_slave.conf'
## Read Configuration variables
config=ConfigParser.RawConfigParser()
config.read(_conffile)
MASTER_HOSTNAME = config.get("MASTER","Hostname")
SERVICE_NAME='distiller-worker'
SERVER_ENDPOINT = "tcp://"+MASTER_HOSTNAME+":5000"
MASTER_ENDPOINT = "tcp://"+MASTER_HOSTNAME+":5001"

QUEUE_ENDPOINT = config.get("QUEUE","Endpoint")
QUEUE_STATUS_ENDPOINT = config.get("QUEUE","StatusEndpoint")
CONVERTER_ENDPOINT = config.get("CONVERTER","Endpoint")
CONVERTER_STATUS_ENDPOINT = config.get("CONVERTER","StatusEndpoint")
WORKER_STATUS_SERVER = config.get("WORKER","StatusServer")
WORKER_PID_FILE = config.get("WORKER","PidFile")
WORKER_LOG_FILE = config.get("WORKER","LogFile")

MAIN_REPOSITORY=config.get("WORKER","LocalRepository")
EXPORT_REPOSITORY=config.get("WORKER","ExportRepository")
FAILED_REPOSITORY=config.get("WORKER","FailedRepository")

Services_Status = {"QUEUE":None,"CONVERTER":None,"WORKER":"RUNNING","BUSY_STATUS":None,"REPOSITORIES":None}
Queues_Size = {"PNGQUEUE":None,"JP2QUEUE":None}

lock = threading.BoundedSemaphore()

class DistillerWorker(object):
	def __init__(self,dispatcher_endpoint=SERVER_ENDPOINT,queue_ipc_endpoint=QUEUE_ENDPOINT,master_endpoint=MASTER_ENDPOINT):
		self.dispatcher_endpoint = dispatcher_endpoint
		self.queue_ipc_endpoint = queue_ipc_endpoint
		self.master_endpoint = master_endpoint
		self.Log = DistillerLogger(WORKER_LOG_FILE,"DistillerWorker")

	def registerWorkerService(self):
                self.register_service = self.context.socket(zmq.REQ)
                self.register_service.connect(self.master_endpoint)
		self.register_service.send(States.D_REGISTER)
                msg = self.register_service.recv()
                if msg == States.D_TRY:
	                self.register_service.send(json.dumps(dict({"REGISTER_SERVICE":"Worker","SERVER":platform.node()})))
	                reg_status = self.register_service.recv()
	                if reg_status == States.D_REGISTERED:
				self.Log.logger.info("Registered to master server")
	                elif reg_status == States.D_REG_ERROR:
	                        self.Log.logger.info("Cannot register to master server")
	                        sys.exit(2)
			self.register_service.close()
			del self.register_service
		elif msg != States.D_TRY:
			self.Log.logger.info("Message error.Cannot register to master server.")
                        sys.exit(2)

	def createContext(self):
		self.context = zmq.Context()
		self.worker = self.context.socket(zmq.SUB)
		self.worker.setsockopt(zmq.SUBSCRIBE,'')
		self.worker.setsockopt(zmq.LINGER,0)
		self.worker.connect(self.dispatcher_endpoint)
		self.queue = self.context.socket(zmq.PUSH)
		self.queue.bind(self.queue_ipc_endpoint)
		self.registerWorkerService()

	def serve_forever(self):
		global Services_Status
		global Queues_Size
		global lock
		statusPoller = DistillerCheckServices(Services_Status,Queues_Size,lock)
		statusPoller.setDaemon(True)
		statusPoller.start()
		statusPublisher = DistillerWorkerStatus(Services_Status,Queues_Size,lock)
		statusPublisher.setDaemon(True)
		statusPublisher.start()
		node_platform = platform.node()
		while True:
			try:
				unfolded_msg = self.worker.recv_pyobj()
				if unfolded_msg["SERVER"] == node_platform:
					self.queue.send_unicode(unfolded_msg["URL"])
					self.Log.logger.info("Got %s" % unfolded_msg["URL"])
			except zmq.ZMQError:
				self.Log.logger.error("ZMQError in send/receive worker loop")
				self.Log.logger.error("Traceback %s" % traceback.format_exc())
			except Exception,e:
				self.Log.logger.error("Unhandled error: %s" % e)	
			finally:
				if unfolded_msg:
					del unfolded_msg


class DistillerWorkerStatus(threading.Thread):
	def __init__(self,Services_Status,Queues_Size,lock):
		self.Services_Status = Services_Status
		self.Queues_Size = Queues_Size
		self.lock = lock
		threading.Thread.__init__(self)
	def run(self):
		self.context = zmq.Context()
		self.status = self.context.socket(zmq.REP)
		self.status.bind(WORKER_STATUS_SERVER)
		self.poller = zmq.Poller()
		self.poller.register(self.status,zmq.POLLIN)
		self.Log = DistillerLogger(WORKER_LOG_FILE,"DistillerWorkerStatus")
		while True:
			if self.poller.poll(100):
				try:
					msg = self.status.recv()
					if msg == States.D_GETSTATUS:
						self.lock.acquire()
						try:
							self.status.send_json(self.Services_Status)
						finally:
							self.lock.release()
					elif msg == States.D_GETQUEUES:
						self.lock.acquire()
						try:
							self.status.send_json(self.Queues_Size)
						finally:
							self.lock.release()
					elif msg == States.D_GETSTATS:
						stats = {}
						stats['CPU_PERCENT'] = psutil.cpu_percent(percpu=True)
						stats['NUM_CPUS'] = psutil.NUM_CPUS
						stats['PHYMEM_USAGE'] = psutil.phymem_usage()
						stats['VIRTMEM_USAGE'] = psutil.virtmem_usage()
						stats['DISK_PARTITIONS'] = psutil.disk_partitions()
						stats['DISK_USAGE'] = psutil.disk_usage('/')
						if os.path.exists(MAIN_REPOSITORY) and os.path.exists(FAILED_REPOSITORY) and os.path.exists(EXPORT_REPOSITORY):
							stats[MAIN_REPOSITORY] = psutil.disk_usage(MAIN_REPOSITORY)
							stats[FAILED_REPOSITORY] = psutil.disk_usage(FAILED_REPOSITORY)
							stats[EXPORT_REPOSITORY] = psutil.disk_usage(EXPORT_REPOSITORY)
						else:
							stats[MAIN_REPOSITORY] = None
							stats[FAILED_REPOSITORY] = None
							stats[EXPORT_REPOSITORY] = None
						stats['DISK_IO_COUNTERS'] = psutil.disk_io_counters()
						stats['NETWORK_IO_COUNTERS'] = psutil.network_io_counters(pernic=True)
						stats['REPOSITORY_LIST'] = [MAIN_REPOSITORY,EXPORT_REPOSITORY,FAILED_REPOSITORY]
						self.status.send_json(stats)
						for key,value in stats.items():
							del stats[key]
						del stats
				except Exception, e:
					self.Log.logger.error(e)
				if msg: del msg

class DistillerCheckServices(threading.Thread):
	def __init__(self,Services_Status,Queues_Size,lock):
		self.Services_Status = Services_Status
		self.Queues_Size = Queues_Size
		self.lock = lock
		threading.Thread.__init__ ( self )
	def run(self):
		self.Log = DistillerLogger(WORKER_LOG_FILE,"DistillerCheckServices")
		self.Log.logger.info(self.Services_Status)
		self.context = zmq.Context()
		while True:
			try:
				for SERVICE in ["QUEUE","CONVERTER"]:
					self.status = self.context.socket(zmq.REQ)
					self.status.setsockopt(zmq.LINGER,0)
					self.status.connect(globals()[SERVICE+"_STATUS_ENDPOINT"])
					self.status.send(States.D_GSTATUS)
					self.poller = zmq.Poller()
					self.poller.register(self.status,zmq.POLLIN)
					if self.poller.poll(1000):
						msg_status = self.status.recv()
						if msg_status == States.D_OK:
							self.lock.acquire()
							try:
								self.Services_Status[SERVICE] = States.D_RUNNING
							finally:
								self.lock.release()
							if SERVICE == "CONVERTER" and self.Services_Status["QUEUE"] == States.D_RUNNING:
				                                self.status_d = self.context.socket(zmq.REQ)
				                                self.status_d.connect(globals()[SERVICE+"_STATUS_ENDPOINT"])
				                                self.status_d.send(States.D_GBUSY)
								busy_status = self.status_d.recv()
								if busy_status == States.D_DBUSY:
									self.lock.acquire()
									try:
										self.Services_Status["BUSY_STATUS"] = States.D_DBUSY
									finally:
										self.lock.release()
								elif busy_status == States.D_NBUSY:
									self.lock.acquire()
									try:
										self.Services_Status["BUSY_STATUS"] = States.D_NBUSY
									finally:
										self.lock.release()
								self.status_d.send(States.D_GETQUEUES)
								queues_size = self.status_d.recv_pyobj()
								lock.acquire()
								try:
									self.Queues_Size["PNGQUEUE"] = queues_size["PNGQUEUE"]
									self.Queues_Size["JP2QUEUE"] = queues_size["JP2QUEUE"]
								finally:
									self.lock.release()
									del queues_size
								del busy_status
								self.status_d.close()
								del self.status_d
								# Check if repositories are mounted
								if os.path.exists(MAIN_REPOSITORY) and os.path.exists(FAILED_REPOSITORY) and os.path.exists(EXPORT_REPOSITORY):
									self.lock.acquire()
									try:
										self.Services_Status[Services.D_REPOSITORIES] = States.D_OK
									finally:
										self.lock.release()
								else:
									self.lock.acquire()
									try:
										self.Services_Status[Services.D_REPOSITORIES] = States.D_NOK
									finally:
										self.lock.release()
									
						elif msg_status == States.D_ERROR:
							self.lock.acquire()
							try:
								self.Services_Status[SERVICE] = States.D_ERROR
							finally:
								self.lock.release()
						else:
							self.lock.acquire()
							try:
								self.Services_Status[SERVICE] = States.D_UNKNOWN
							finally:
								self.lock.release()
						if msg_status: del msg_status
					else:
							self.lock.acquire()
							try:
								self.Services_Status[SERVICE] = States.D_DOWN
							finally:
								self.lock.release()
					self.status.close()
					del self.status; del self.poller; 
				time.sleep(0.5)
			except Exception, e:
				self.Log.logger.error("Error %s" % e)
			
class DistillerWorkerServer(object):
        def __init__(self):
		self.stdin_path = '/dev/null'
                self.stdout_path = '/dev/null'
                self.stderr_path = '/dev/null'
                self.pidfile_path =  WORKER_PID_FILE
                self.pidfile_timeout = 5
		self.prevent_core = False
        def run(self):
                Worker = DistillerWorker()
                Worker.createContext()
		Worker.serve_forever()


if __name__ == "__main__":
	
	optparser = optparse.OptionParser(usage=usage,version=version)
        (options, args) = optparser.parse_args()

	setproctitle.setproctitle(SERVICE_NAME)

        if len(args) == 0:
                optparser.print_help()
                sys.exit(-1)
	
	if args[0] == 'start':
                if not os.path.exists(WORKER_PID_FILE):
                        server = DistillerWorkerServer()
                        daemon_runner = runner.DaemonRunner(server)
                        daemon_runner.parse_args()
                        daemon_runner._start()
                else:
                        print "PID File exists: %s" % WORKER_PID_FILE
        elif args[0] == 'stop':
                if os.path.exists(WORKER_PID_FILE):
                        server = DistillerWorkerServer()
                        daemon_runner = runner.DaemonRunner(server)
                        daemon_runner._stop()
                else:
                        print "Worker slave server is not running."
        elif args[0] == 'restart':
                if os.path.exists(WORKER_PID_FILE):
                        server = DistillerWorkerServer()
                        daemon_runner = runner.DaemonRunner(server)
                        daemon_runner._restart()
                else:
                        server = DistillerWorkerServer()
                        daemon_runner = runner.DaemonRunner(server)
                        daemon_runner._start()
        elif args[0] == 'status':
                if os.path.exists(WORKER_PID_FILE):
                        fd = open(WORKER_PID_FILE,'r')
                        pid = fd.readlines()[0].strip()
                        fd.close()
                        print "Worker slave server is running, PID: %s" % pid
                else:
                        print "Worker slave server is not running"
	elif args[0] == 'log':
                log = DistillerTailLog(WORKER_LOG_FILE)
                log.follow()
        else:
                optparser.print_help()
                sys.exit(-1)
